{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Tce3stUlHN0L"
      },
      "source": [
        "##### Copyright 2020 The TensorFlow IO Authors."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "tuOe1ymfHZPu"
      },
      "outputs": [],
      "source": [
        "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "# https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "qFdPvlXBOdUN"
      },
      "source": [
        "# Avro Dataset API"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MfBg1C5NB3X0"
      },
      "source": [
        "<table class=\"tfo-notebook-buttons\" align=\"left\">\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://www.tensorflow.org/io/tutorials/avro\"><img src=\"https://www.tensorflow.org/images/tf_logo_32px.png\" />View on TensorFlow.org</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://colab.research.google.com/github/tensorflow/io/blob/master/docs/tutorials/avro.ipynb\"><img src=\"https://www.tensorflow.org/images/colab_logo_32px.png\" />Run in Google Colab</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://github.com/tensorflow/io/blob/master/docs/tutorials/avro.ipynb\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" />View source on GitHub</a>\n",
        "  </td>\n",
        "      <td>\n",
        "    <a href=\"https://storage.googleapis.com/tensorflow_docs/io/docs/tutorials/avro.ipynb\"><img src=\"https://www.tensorflow.org/images/download_logo_32px.png\" />Download notebook</a>\n",
        "  </td>\n",
        "</table>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "xHxb-dlhMIzW"
      },
      "source": [
        "## Overview\n",
        "\n",
        "The objective of Avro Dataset API is to load Avro formatted data natively into TensorFlow as <a target=\"_blank\" href=\"https://www.tensorflow.org/api_docs/python/tf/data/Dataset\">TensorFlow dataset</a>. Avro is a data serialization system similiar to Protocol Buffers. It's widely used in Apache Hadoop where it can provide both a serialization format for persistent data, and a wire format for communication between Hadoop nodes. Avro data is a row-oriented, compacted binary data format. It relies on schema which is stored as a separate JSON file. For the spec of Avro format and schema declaration, please refer to <a target=\"_blank\" href=\"https://avro.apache.org/docs/current/spec.html\">the official manual</a>.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MUXex9ctTuDB"
      },
      "source": [
        "## Setup package\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "upgCc3gXybsA"
      },
      "source": [
        "### Install the required tensorflow-io package"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "uUDYyMZRfkX4"
      },
      "outputs": [],
      "source": [
        "!pip install tensorflow-io"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "gjrZNJQRJP-U"
      },
      "source": [
        "### Import packages"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 3,
      "metadata": {
        "id": "m6KXZuTBWgRm"
      },
      "outputs": [],
      "source": [
        "import tensorflow as tf\n",
        "import tensorflow_io as tfio\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "eCgO11GTJaTj"
      },
      "source": [
        "### Validate tf and tfio imports"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 4,
      "metadata": {
        "id": "dX74RKfZ_TdF"
      },
      "outputs": [],
      "source": [
        "print(\"tensorflow-io version: {}\".format(tfio.__version__))\n",
        "print(\"tensorflow version: {}\".format(tf.__version__))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "J0ZKhA6s0Pjp"
      },
      "source": [
        "## Usage"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "4CfKVmCvwcL7"
      },
      "source": [
        "### Explore the dataset\n",
        "\n",
        "For the purpose of this tutorial, let's download the sample Avro dataset. \n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IGnbXuVnSo8T"
      },
      "source": [
        "Download a sample Avro file:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Tu01THzWcE-J"
      },
      "outputs": [],
      "source": [
        "!curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro\n",
        "!ls -l train.avro"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IGnbXuVnSo8T"
      },
      "source": [
        "Download the corresponding schema file of the sample Avro file:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Tu01THzWcE-J"
      },
      "outputs": [],
      "source": [
        "!curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc\n",
        "!ls -l train.avsc"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "z9GCyPWNuOm7"
      },
      "source": [
        "In the above example, a testing Avro dataset were created based on mnist dataset. The original mnist dataset in TFRecord format is generated from <a target=\"_blank\" href=\"https://www.tensorflow.org/datasets/api_docs/python/tfds/load\">TF named dataset</a>. However, the mnist dataset is too large as a demo dataset. For simplicity purpose, most of it were trimmed and first few records only were kept. Moreover, additional trimming was done for `image` field in original mnist dataset and mapped it to `features` field in Avro. So the avro file `train.avro` has 4 records, each of which has 3 fields: `features`, which is an array of int, `label`, an int or null, and `dataType`, an enum. To view the decoded `train.avro` (Note <a target=\"_blank\" href=\"https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro\">the original avro data file</a> is not human readable as avro is a compacted format):\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "upgCc3gXybsB"
      },
      "source": [
        "Install the required package to read Avro file:\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "nS3eTBvjt-O4"
      },
      "outputs": [],
      "source": [
        "!pip install avro\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "upgCc3gXybsB"
      },
      "source": [
        "To read and print an Avro file in a human-readable format:\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "nS3eTBvjt-O5"
      },
      "outputs": [],
      "source": [
        "from avro.io import DatumReader\n",
        "from avro.datafile import DataFileReader\n",
        "\n",
        "import json\n",
        "\n",
        "def print_avro(avro_file, max_record_num=None):\n",
        "    if max_record_num is not None and max_record_num <= 0:\n",
        "        return\n",
        "\n",
        "    with open(avro_file, 'rb') as avro_handler:\n",
        "        reader = DataFileReader(avro_handler, DatumReader())\n",
        "        record_count = 0\n",
        "        for record in reader:\n",
        "            record_count = record_count+1\n",
        "            print(record)\n",
        "            if max_record_num is not None and record_count == max_record_num:\n",
        "               break\n",
        "\n",
        "print_avro(avro_file='train.avro')\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "z9GCyPWNuOm7"
      },
      "source": [
        "And the schema of `train.avro` which is represented by `train.avsc` is a JSON-formatted file.\n",
        "To view the `train.avsc`: \n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "nS3eTBvjt-O5"
      },
      "outputs": [],
      "source": [
        "def print_schema(avro_schema_file):\n",
        "    with open(avro_schema_file, 'r') as handle:\n",
        "        parsed = json.load(handle)\n",
        "    print(json.dumps(parsed, indent=4, sort_keys=True))\n",
        "\n",
        "print_schema('train.avsc')\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "4CfKVmCvwcL7"
      },
      "source": [
        "### Prepare the dataset\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "z9GCyPWNuOm7"
      },
      "source": [
        "Load `train.avro` as TensorFlow dataset with Avro dataset API: \n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "nS3eTBvjt-O5"
      },
      "outputs": [],
      "source": [
        "features = {\n",
        "    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),\n",
        "    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),\n",
        "    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)\n",
        "}\n",
        "\n",
        "schema = tf.io.gfile.GFile('train.avsc').read()\n",
        "\n",
        "dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],\n",
        "                                                              reader_schema=schema,\n",
        "                                                              features=features,\n",
        "                                                              shuffle=False,\n",
        "                                                              batch_size=3,\n",
        "                                                              num_epochs=1)\n",
        "\n",
        "for record in dataset:\n",
        "    print(record['features[*]'])\n",
        "    print(record['label'])\n",
        "    print(record['dataType'])\n",
        "    print(\"--------------------\")\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IF_kYz_o2DH4"
      },
      "source": [
        "The above example converts `train.avro` into tensorflow dataset. Each element of the dataset is a dictionary whose key is the feature name, value is the converted sparse or dense tensor. \n",
        "E.g, it converts `features`, `label`, `dataType` field to a VarLenFeature(SparseTensor), FixedLenFeature(DenseTensor), and FixedLenFeature(DenseTensor) respectively. Since batch_size is 3, it coerce 3 records from `train.avro` into one element in the result dataset.\n",
        "For the first record in `train.avro` whose label is null, avro reader replaces it with the specified default value(-100).\n",
        "In this example, there're 4 records in total in `train.avro`. Since batch size is 3, the result dataset contains 3 elements, last of which's batch size is 1. However user is also able to drop the last batch if the size is smaller than batch size by enabling `drop_final_batch`. E.g: \n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "nS3eTBvjt-O5"
      },
      "outputs": [],
      "source": [
        "dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],\n",
        "                                                              reader_schema=schema,\n",
        "                                                              features=features,\n",
        "                                                              shuffle=False,\n",
        "                                                              batch_size=3,\n",
        "                                                              drop_final_batch=True,\n",
        "                                                              num_epochs=1)\n",
        "\n",
        "for record in dataset:\n",
        "    print(record)\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IF_kYz_o2DH4"
      },
      "source": [
        "One can also increase num_parallel_reads to expediate Avro data processing by increasing avro parse/read parallelism.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "nS3eTBvjt-O5"
      },
      "outputs": [],
      "source": [
        "dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],\n",
        "                                                              reader_schema=schema,\n",
        "                                                              features=features,\n",
        "                                                              shuffle=False,\n",
        "                                                              num_parallel_reads=16,\n",
        "                                                              batch_size=3,\n",
        "                                                              drop_final_batch=True,\n",
        "                                                              num_epochs=1)\n",
        "\n",
        "for record in dataset:\n",
        "    print(record)\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IF_kYz_o2DH4"
      },
      "source": [
        "For detailed usage of `make_avro_record_dataset`, please refer to <a target=\"_blank\" href=\"https://www.tensorflow.org/io/api_docs/python/tfio/experimental/columnar/make_avro_record_dataset\">API doc</a>.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "4CfKVmCvwcL7"
      },
      "source": [
        "### Train tf.keras models with Avro dataset\n",
        "\n",
        "Now let's walk through an end-to-end example of tf.keras model training with Avro dataset based on mnist dataset.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "z9GCyPWNuOm7"
      },
      "source": [
        "Load `train.avro` as TensorFlow dataset with Avro dataset API: \n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "nS3eTBvjt-O5"
      },
      "outputs": [],
      "source": [
        "features = {\n",
        "    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)\n",
        "}\n",
        "\n",
        "schema = tf.io.gfile.GFile('train.avsc').read()\n",
        "\n",
        "dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],\n",
        "                                                              reader_schema=schema,\n",
        "                                                              features=features,\n",
        "                                                              shuffle=False,\n",
        "                                                              batch_size=1,\n",
        "                                                              num_epochs=1)\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "z9GCyPWNuOm7"
      },
      "source": [
        "Define a simple keras model: \n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 3,
      "metadata": {
        "id": "m6KXZuTBWgRm"
      },
      "outputs": [],
      "source": [
        "def build_and_compile_cnn_model():\n",
        "    model = tf.keras.Sequential()\n",
        "    model.compile(optimizer='sgd', loss='mse')\n",
        "    return model\n",
        "\n",
        "model = build_and_compile_cnn_model()\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "4CfKVmCvwcL7"
      },
      "source": [
        "### Train the keras model with Avro dataset:\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 3,
      "metadata": {
        "id": "m6KXZuTBWgRm"
      },
      "outputs": [],
      "source": [
        "model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IF_kYz_o2DH4"
      },
      "source": [
        "The avro dataset can parse and coerce any avro data into TensorFlow tensors, including records in records, maps, arrays, branches, and enumerations. The parsing information is passed into the avro dataset implementation as a map where \n",
        "keys encode how to parse the data \n",
        "values encode on how to coerce the data into TensorFlow tensors – deciding the primitive type (e.g. bool, int, long, float, double, string) as well as the tensor type (e.g. sparse or dense). A listing of TensorFlow's parser types (see Table 1) and the coercion of primitive types (Table 2) is provided. \n",
        "\n",
        "Table 1 the supported TensorFlow parser types:\n",
        "\n",
        "TensorFlow Parser Types|TensorFlow Tensors|Explanation\n",
        "----|----|------\n",
        "tf.FixedLenFeature([], tf.int32)|dense tensor|Parse a fixed length feature; that is all rows have the same constant number of elements, e.g. just one element or an array that has always the same number of elements for each row \n",
        "tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) |sparse tensor|Parse a sparse feature where each row has a variable length list of indices and values. The 'index_key' identifies the indices. The 'value_key' identifies the value. The 'dtype' is the data type. The 'size' is the expected maximum index value for each index entry\n",
        "tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) |sparse tensor|Parse a variable length feature; that means each data row can have a variable number of elements, e.g. the 1st row has 5 elements, the 2nd row has 7 elements\n",
        "\n",
        "Table 2 the supported conversion from Avro types to TensorFlow's types:\n",
        "\n",
        "Avro Primitive Type|TensorFlow Primitive Type\n",
        "----|----\n",
        "boolean: a binary value|tf.bool\n",
        "bytes: a sequence of 8-bit unsigned bytes|tf.string\n",
        "double: double precision 64-bit IEEE floating point number|tf.float64\n",
        "enum: enumeration type|tf.string using the symbol name\n",
        "float: single precision 32-bit IEEE floating point number|tf.float32\n",
        "int: 32-bit signed integer|tf.int32\n",
        "long: 64-bit signed integer|tf.int64\n",
        "null: no value|uses default value\n",
        "string: unicode character sequence|tf.string\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IF_kYz_o2DH4"
      },
      "source": [
        "A comprehensive set of examples of Avro dataset API is provided within <a target=\"_blank\" href=\"https://github.com/tensorflow/io/blob/master/tests/test_parse_avro.py#L437\">the tests</a>.\n"
      ]
    }
  ],
  "metadata": {
    "accelerator": "GPU",
    "colab": {
      "collapsed_sections": [
        "Tce3stUlHN0L"
      ],
      "name": "avro.ipynb",
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
